package com.ruc.dbiir.rest.datasource;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import com.ruc.dbiir.rest.utils.Config;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;

/**
 * 从kafka消息队列读（消费）数据
 * author：mark   
 * createTime：May 26, 2018 1:27:41 PM   
 * @version
 */


//@Component(value = "readKafka")
public class ReadKafka {

	
	private final ConsumerConnector consumer;


	public ReadKafka() {
		Properties props = new Properties();
		//zookeeper 配置
		props.put("zookeeper.connect", "202.112.113.71:2181");

		//group 代表一个消费组
		props.put("group.id", "jd-group");

		//zk连接超时
		props.put("zookeeper.session.timeout.ms", "6000000");
		props.put("zookeeper.sync.time.ms", "200");
		props.put("auto.commit.interval.ms", "1000");
		props.put("auto.offset.reset", "smallest"); //smallest
		//序列化类
		props.put("serializer.class", "kafka.serializer.StringEncoder");

		ConsumerConfig config = new ConsumerConfig(props);
		consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);
	}


	public void consume() {
		Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
		topicCountMap.put(Config.TOPIC_RATE, new Integer(1));

		StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
		StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());

		Map<String, List<KafkaStream<String, String>>> consumerMap = 
				consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);

		//消费速率数据
		KafkaStream<String, String> stream = consumerMap.get(Config.TOPIC_RATE).get(0);
		ConsumerIterator<String, String> it = stream.iterator();
		MessageAndMetadata< String, String> mam = null;
		
		while (it.hasNext()) {
			mam = it.next();
			String rateStr = mam.message();//值
			String key = mam.key();//key值
			
//			System.out.println("kafkfa Rate=="+rateStr);
			
			
			//多线程情况下计算速率 start  ==========最后一组数据需要下一次才能打印，等待修改
			int rate = Integer.parseInt(rateStr);
			
			if (Config.PRE_KEY.equals("initial")) {
				Config.PRE_KEY = key;
				Config.ACC_VALUE += rate;
			}else if (key.equals(Config.PRE_KEY)) {
				Config.ACC_VALUE += rate;
			}
			else {
				Config.NUMQUEUE.offer(Config.ACC_VALUE+"");
				System.out.println("KEY = :"+ Config.PRE_KEY+" ACC_VALUE:="+Config.ACC_VALUE);
				Config.PRE_KEY = key;
				Config.ACC_VALUE = rate;
			}
			//end
			
		
		}
		
	}
		public static void main(String[] args) {
			new ReadKafka().consume();
			
		}
}
